/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.distributed.near;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import static org.apache.ignite.internal.processors.tracing.MTC.support;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;

/**
 *
 */
public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter implements
    IgniteDiagnosticAware {
    /** */
    private int miniId;

    /** */
    private GridDhtTxMapping txMapping;

    /**
     * @param cctx Context.
     * @param tx Transaction.
     */
    public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
        super(cctx, tx);

        assert tx.optimistic() && !tx.serializable() : tx;
    }

    /** {@inheritDoc} */
    @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
        if (log.isDebugEnabled())
            log.debug("Transaction future received owner changed callback: " + entry);

        if (tx.remainingTime() == -1)
            return false;

        if (entry.context().isNear() &&
            owner != null && tx.hasWriteKey(entry.txKey())) {
            if (keyLockFut != null)
                keyLockFut.onKeyLocked(entry.txKey());

            return true;
        }

        return false;
    }

    /** {@inheritDoc} */
    @Override public boolean onNodeLeft(UUID nodeId) {
        boolean found = false;

        for (IgniteInternalFuture<?> fut : futures()) {
            if (isMini(fut)) {
                MiniFuture f = (MiniFuture)fut;

                if (f.node().id().equals(nodeId)) {
                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
                        nodeId);

                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));

                    f.onNodeLeft(e, true);

                    found = true;
                }
            }
        }

        return found;
    }

    /**
     * @param e Error.
     * @param discoThread {@code True} if executed from discovery thread.
     */
    private void onError(Throwable e, boolean discoThread) {
        try (TraceSurroundings ignored = support(span)) {
            if (e instanceof IgniteTxTimeoutCheckedException) {
                onTimeout();

                return;
            }

            if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
                if (tx.onePhaseCommit()) {
                    tx.markForBackupCheck();

                    onComplete();

                    return;
                }
            }

            if (ERR_UPD.compareAndSet(this, null, e))
                onComplete();
        }
    }

    /** {@inheritDoc} */
    @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
        if (!isDone()) {
            MiniFuture mini = miniFuture(res.miniId());

            if (mini != null) {
                assert mini.node().id().equals(nodeId);

                mini.onResult(res);
            }
            else {
                if (msgLog.isDebugEnabled()) {
                    msgLog.debug("Near optimistic prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() +
                        ", node=" + nodeId +
                        ", res=" + res +
                        ", fut=" + this + ']');
                }
            }
        }
        else {
            if (msgLog.isDebugEnabled()) {
                msgLog.debug("Near optimistic prepare fut, response for finished future [txId=" + tx.nearXidVersion() +
                    ", node=" + nodeId +
                    ", res=" + res +
                    ", fut=" + this + ']');
            }
        }
    }

    /**
     * @return Keys for which {@code MiniFuture} isn't completed.
     */
    public Set<IgniteTxKey> requestedKeys() {
        compoundsReadLock();

        try {
            int size = futuresCountNoLock();

            for (int i = 0; i < size; i++) {
                IgniteInternalFuture fut = future(i);

                if (isMini(fut) && !fut.isDone()) {
                    MiniFuture miniFut = (MiniFuture)fut;

                    Collection<IgniteTxEntry> entries = miniFut.mapping().entries();

                    Set<IgniteTxKey> keys = U.newHashSet(entries.size());

                    for (IgniteTxEntry entry : entries)
                        keys.add(entry.txKey());

                    return keys;
                }
            }
        }
        finally {
            compoundsReadUnlock();
        }

        return null;
    }

    /**
     * Finds pending mini future by the given mini ID.
     *
     * @param miniId Mini ID to find.
     * @return Mini future.
     */
    private MiniFuture miniFuture(int miniId) {
        // We iterate directly over the futs collection here to avoid copy.
        compoundsReadLock();

        try {
            int size = futuresCountNoLock();

            // Avoid iterator creation.
            for (int i = size - 1; i >= 0; i--) {
                IgniteInternalFuture fut = future(i);

                if (!isMini(fut))
                    continue;

                MiniFuture mini = (MiniFuture)fut;

                if (mini.futureId() == miniId) {
                    if (!mini.isDone())
                        return mini;
                    else
                        return null;
                }
            }
        }
        finally {
            compoundsReadUnlock();
        }

        return null;
    }

    /** {@inheritDoc} */
    @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
        try (TraceSurroundings ignored = MTC.support(span)) {
            if (isDone())
                return false;

            ERR_UPD.compareAndSet(this, null, err);

            return onComplete();
        }
    }

    /**
     * @param f Future.
     * @return {@code True} if mini-future.
     */
    private boolean isMini(IgniteInternalFuture<?> f) {
        return f.getClass().equals(MiniFuture.class);
    }

    /**
     * Completeness callback.
     *
     * @return {@code True} if future was finished by this call.
     */
    private boolean onComplete() {
        Throwable err0 = err;

        if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == null) &&
            (err0 == null || tx.needCheckBackup()))
            tx.state(PREPARED);

        if (super.onDone(tx, err0)) {
            // Don't forget to clean up.
            cctx.mvcc().removeVersionedFuture(this);

            return true;
        }

        return false;
    }

    /**
     * Initializes future.
     *
     * @param remap Remap flag.
     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
     */
    @Override protected void prepare0(boolean remap, boolean topLocked) {
        try {
            boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);

            if (!txStateCheck) {
                if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
                    if (tx.remainingTime() == -1)
                        onDone(tx.timeoutException());
                    else
                        onDone(tx.rollbackException());
                }
                else
                    onDone(new IgniteCheckedException("Invalid transaction state for " +
                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));

                return;
            }

            IgniteTxEntry singleWrite = tx.singleWrite();

            if (singleWrite != null)
                prepareSingle(singleWrite, topLocked, remap);
            else
                prepare(tx.writeEntries(), topLocked, remap);

            markInitialized();
        }
        catch (TransactionTimeoutException e) {
            onError(e, false);
        }
    }

    /**
     * @param write Write.
     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
     * @param remap Remap flag.
     */
    private void prepareSingle(IgniteTxEntry write, boolean topLocked, boolean remap) {
        write.clearEntryReadVersion();

        AffinityTopologyVersion topVer = tx.topologyVersion();

        assert topVer.topologyVersion() > 0;

        txMapping = new GridDhtTxMapping();

        GridDistributedTxMapping mapping = map(write, topVer, null, topLocked, remap);

        if (isDone()) {
            if (log.isDebugEnabled())
                log.debug("Abandoning (re)map because future is done: " + this);

            return;
        }

        if (mapping.primary().isLocal()) {
            if (write.context().isNear())
                tx.nearLocallyMapped(true);
            else if (write.context().isColocated())
                tx.colocatedLocallyMapped(true);
        }

        if (keyLockFut != null)
            keyLockFut.onAllKeysAdded();

        tx.addSingleEntryMapping(mapping, write);

        cctx.mvcc().recheckPendingLocks();

        mapping.last(true);

        tx.transactionNodes(txMapping.transactionNodes());

        if (!write.context().isNear())
            checkOnePhase(txMapping);

        assert !(mapping.hasColocatedCacheEntries() && mapping.hasNearCacheEntries()) : mapping;

        proceedPrepare(mapping, null);
    }

    /**
     * @param writes Write entries.
     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
     * @param remap Remap flag.
     */
    private void prepare(
        Iterable<IgniteTxEntry> writes,
        boolean topLocked,
        boolean remap
    ) {
        AffinityTopologyVersion topVer = tx.topologyVersion();

        assert topVer.topologyVersion() > 0;

        txMapping = new GridDhtTxMapping();

        Map<Object, GridDistributedTxMapping> map = new HashMap<>();

        // Assign keys to primary nodes.
        GridDistributedTxMapping cur = null;

        Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();

        boolean hasNearCache = false;

        for (IgniteTxEntry write : writes) {
            write.clearEntryReadVersion();

            GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap);

            if (updated == null)
                // an exception occurred while transaction mapping, stop further processing
                break;

            if (write.context().isNear())
                hasNearCache = true;

            if (cur != updated) {
                mappings.offer(updated);

                updated.last(true);

                ClusterNode primary = updated.primary();

                assert !primary.isLocal() || !cctx.kernalContext().clientNode();

                // Minor optimization to not create MappingKey: on client node can not have mapping for local node.
                Object key = cctx.kernalContext().clientNode() ? primary.id() :
                    new MappingKey(primary.id(), primary.isLocal() && updated.hasNearCacheEntries());

                GridDistributedTxMapping prev = map.put(key, updated);

                if (prev != null)
                    prev.last(false);

                if (updated.primary().isLocal()) {
                    if (write.context().isNear())
                        tx.nearLocallyMapped(true);
                    else if (write.context().isColocated())
                        tx.colocatedLocallyMapped(true);
                }

                cur = updated;
            }
        }

        if (isDone()) {
            if (log.isDebugEnabled())
                log.debug("Abandoning (re)map because future is done: " + this);

            return;
        }

        if (keyLockFut != null)
            keyLockFut.onAllKeysAdded();

        tx.addEntryMapping(mappings);

        cctx.mvcc().recheckPendingLocks();

        tx.transactionNodes(txMapping.transactionNodes());

        if (!hasNearCache)
            checkOnePhase(txMapping);

        proceedPrepare(mappings);
    }

    /**
     * Continues prepare after previous mapping successfully finished.
     *
     * @param mappings Queue of mappings.
     */
    private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) {
        final GridDistributedTxMapping m = mappings.poll();

        if (m == null)
            return;

        proceedPrepare(m, mappings);
    }

    /**
     * Continues prepare after previous mapping successfully finished.
     *
     * @param m Mapping.
     * @param mappings Queue of mappings.
     */
    private void proceedPrepare(GridDistributedTxMapping m, @Nullable final Queue<GridDistributedTxMapping> mappings) {
        if (isDone())
            return;

        boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());

        try {
            assert !m.empty();

            final ClusterNode n = m.primary();

            long timeout = tx.remainingTime();

            if (timeout != -1) {
                GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
                    futId,
                    tx.topologyVersion(),
                    tx,
                    timeout,
                    null,
                    m.writes(),
                    m.hasNearCacheEntries(),
                    txMapping.transactionNodes(),
                    m.last(),
                    tx.onePhaseCommit(),
                    tx.needReturnValue() && tx.implicit(),
                    tx.implicitSingle(),
                    m.explicitLock(),
                    tx.taskNameHash(),
                    m.clientFirst(),
                    txMapping.transactionNodes().size() == 1,
                    tx.activeCachesDeploymentEnabled(),
                    tx.txState().recovery());

                for (IgniteTxEntry txEntry : m.entries()) {
                    if (txEntry.op() == TRANSFORM)
                        req.addDhtVersion(txEntry.txKey(), null);
                }

                // Must lock near entries separately.
                if (m.hasNearCacheEntries()) {
                    try {
                        cctx.tm().prepareTx(tx, m.nearCacheEntries());
                    }
                    catch (IgniteCheckedException e) {
                        onError(e, false);

                        return;
                    }
                }

                final MiniFuture fut = new MiniFuture(this, m, ++miniId, mappings);

                req.miniId(fut.futureId());

                add((IgniteInternalFuture)fut); // Append new future.

                if (n.isLocal()) {
                    assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries()) : m;

                    IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
                        m.hasNearCacheEntries() ? cctx.tm().txHandler().prepareNearTxLocal(tx, req)
                        : cctx.tm().txHandler().prepareColocatedTx(tx, req);

                    prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
                        @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
                            try {
                                fut.onResult(prepFut.get());
                            }
                            catch (IgniteCheckedException e) {
                                fut.onResult(e);
                            }
                        }
                    });
                }
                else {
                    try {
                        cctx.tm().sendTransactionMessage(n, req, tx, tx.ioPolicy());

                        if (msgLog.isDebugEnabled()) {
                            msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() +
                                ", node=" + n.id() + ']');
                        }
                    }
                    catch (ClusterTopologyCheckedException e) {
                        e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));

                        fut.onNodeLeft(e, false);
                    }
                    catch (IgniteCheckedException e) {
                        if (msgLog.isDebugEnabled()) {
                            msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() +
                                ", node=" + n.id() +
                                ", err=" + e + ']');
                        }

                        fut.onResult(e);
                    }
                }
            }
            else
                onTimeout();
        }
        finally {
            if (set)
                cctx.tm().setTxTopologyHint(null);
        }
    }

    /**
     * @param entry Transaction entry.
     * @param topVer Topology version.
     * @param cur Current mapping.
     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
     * @param remap Remap flag.
     * @return Mapping.
     */
    private GridDistributedTxMapping map(
        IgniteTxEntry entry,
        AffinityTopologyVersion topVer,
        @Nullable GridDistributedTxMapping cur,
        boolean topLocked,
        boolean remap
    ) {
        GridCacheContext cacheCtx = entry.context();

        List<ClusterNode> nodes;

        GridCacheEntryEx cached0 = entry.cached();

        if (cached0.isDht())
            nodes = cacheCtx.topology().nodes(cached0.partition(), topVer);
        else
            nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);

        if (F.isEmpty(nodes)) {
            ClusterTopologyServerNotFoundException e = new ClusterTopologyServerNotFoundException("Failed to map " +
                "keys to nodes (partition is not mapped to any node) [key=" + entry.key() +
                ", partition=" + cacheCtx.affinity().partition(entry.key()) + ", topVer=" + topVer + ']');

            onDone(e);

            return null;
        }

        txMapping.addMapping(nodes);

        ClusterNode primary = F.first(nodes);

        assert primary != null;

        if (log.isDebugEnabled()) {
            log.debug("Mapped key to primary node [key=" + entry.key() +
                ", part=" + cacheCtx.affinity().partition(entry.key()) +
                ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
        }

        // Must re-initialize cached entry while holding topology lock.
        if (cacheCtx.isNear())
            entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
        else
            entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));

        if (cacheCtx.isNear()) {
            if (entry.explicitVersion() == null && !remap) {
                if (keyLockFut == null) {
                    keyLockFut = new KeyLockFuture();

                    add((IgniteInternalFuture)keyLockFut);
                }

                keyLockFut.addLockKey(entry.txKey());
            }
        }

        if (cur == null || !cur.primary().id().equals(primary.id()) ||
            (primary.isLocal() && cur.hasNearCacheEntries() != cacheCtx.isNear())) {
            boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();

            cur = new GridDistributedTxMapping(primary);

            cur.clientFirst(clientFirst);
        }

        cur.add(entry);

        if (entry.explicitVersion() != null) {
            tx.markExplicit(primary.id());

            cur.markExplicitLock();
        }

        entry.nodeId(primary.id());

        if (cacheCtx.isNear()) {
            while (true) {
                try {
                    GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();

                    cached.dhtNodeId(tx.xidVersion(), primary.id());

                    break;
                }
                catch (GridCacheEntryRemovedException ignore) {
                    entry.cached(cacheCtx.near().entryEx(entry.key(), topVer));
                }
            }
        }

        return cur;
    }

    /**
     *
     */
    private void onTimeout() {
        try (TraceSurroundings ignored = MTC.support(span)) {
            if (cctx.tm().deadlockDetectionEnabled()) {
                Set<IgniteTxKey> keys = null;

                if (keyLockFut != null)
                    keys = new HashSet<>(keyLockFut.lockKeys);
                else {
                    compoundsReadLock();

                    try {
                        int size = futuresCountNoLock();

                        for (int i = 0; i < size; i++) {
                            IgniteInternalFuture fut = future(i);

                            if (isMini(fut) && !fut.isDone()) {
                                MiniFuture miniFut = (MiniFuture)fut;

                                Collection<IgniteTxEntry> entries = miniFut.mapping().entries();

                                keys = U.newHashSet(entries.size());

                                for (IgniteTxEntry entry : entries)
                                    keys.add(entry.txKey());

                                break;
                            }
                        }
                    }
                    finally {
                        compoundsReadUnlock();
                    }
                }

                add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, Object>() {
                    @Override public GridNearTxPrepareResponse apply(TxDeadlock deadlock, Exception e) {
                        if (e != null)
                            U.warn(log, "Failed to detect deadlock.", e);
                        else {
                            e = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " +
                                "transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']',
                                deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx)) : null);

                            if (!ERR_UPD.compareAndSet(GridNearOptimisticTxPrepareFuture.this, null, e)
                                && err instanceof IgniteTxTimeoutCheckedException) {
                                err = e;
                            }
                        }

                        onDone(null, e);

                        return null;
                    }
                }, cctx.tm().detectDeadlock(tx, keys)));
            }
            else {
                ERR_UPD.compareAndSet(this, null, new IgniteTxTimeoutCheckedException("Failed to acquire lock " +
                    "within provided timeout for transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']'));

                onComplete();
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) {
        if (!isDone()) {
            for (IgniteInternalFuture fut : futures()) {
                if (!fut.isDone()) {
                    if (fut instanceof MiniFuture) {
                        MiniFuture miniFut = (MiniFuture)fut;

                        UUID nodeId = miniFut.node().id();
                        GridCacheVersion dhtVer = miniFut.m.dhtVersion();
                        GridCacheVersion nearVer = tx.nearXidVersion();

                        if (dhtVer != null) {
                            ctx.remoteTxInfo(
                                nodeId,
                                dhtVer,
                                nearVer,
                                "GridNearOptimisticTxPrepareFuture waiting for remote node response [" +
                                    "nodeId=" + nodeId +
                                    ", topVer=" + tx.topologyVersion() +
                                    ", dhtVer=" + dhtVer +
                                    ", nearVer=" + nearVer +
                                    ", futId=" + futId +
                                    ", miniId=" + miniFut.futId +
                                    ", tx=" + tx + ']');
                        }
                        else {
                            ctx.basicInfo(
                                cctx.localNodeId(),
                                "GridNearOptimisticTxPrepareFuture waiting for remote node response [" +
                                    "nodeId=" + nodeId +
                                    ", topVer=" + tx.topologyVersion() +
                                    ", dhtVer=" + dhtVer +
                                    ", nearVer=" + nearVer +
                                    ", futId=" + futId +
                                    ", miniId=" + miniFut.futId +
                                    ", tx=" + tx + ']');
                        }
                    }
                    else if (fut instanceof KeyLockFuture) {
                        KeyLockFuture keyFut = (KeyLockFuture)fut;

                        ctx.basicInfo(
                            cctx.localNodeId(),
                            "GridNearOptimisticTxPrepareFuture waiting for local keys lock [" +
                                "node=" + cctx.localNodeId() +
                                ", topVer=" + tx.topologyVersion() +
                                ", allKeysAdded=" + keyFut.allKeysAdded +
                                ", keys=" + keyFut.lockKeys + ']');
                    }
                }
            }
        }
    }

    /** {@inheritDoc} */
    @Override public String toString() {
        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
            @Override public String apply(IgniteInternalFuture<?> f) {
                if (isMini(f)) {
                    return "[node=" + ((MiniFuture)f).node().id() +
                        ", loc=" + ((MiniFuture)f).node().isLocal() +
                        ", done=" + f.isDone() + "]";
                }
                else
                    return f.toString();
            }
        }, new P1<IgniteInternalFuture<Object>>() {
            @Override public boolean apply(IgniteInternalFuture<Object> fut) {
                return isMini(fut);
            }
        });

        return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
            "innerFuts", futs,
            "tx", tx,
            "super", super.toString());
    }

    /**
     *
     */
    private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
        /** Receive result flag updater. */
        private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
            AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");

        /** Parent future. */
        private final GridNearOptimisticTxPrepareFuture parent;

        /** */
        private final int futId;

        /** Keys. */
        @GridToStringInclude
        private GridDistributedTxMapping m;

        /** Flag to signal some result being processed. */
        private volatile int rcvRes;

        /** Mappings to proceed prepare. */
        private Queue<GridDistributedTxMapping> mappings;

        /**
         * @param parent Parent.
         * @param m Mapping.
         * @param futId Mini future ID.
         * @param mappings Queue of mappings to proceed with.
         */
        MiniFuture(GridNearOptimisticTxPrepareFuture parent,
            GridDistributedTxMapping m,
            int futId,
            Queue<GridDistributedTxMapping> mappings) {
            this.parent = parent;
            this.m = m;
            this.futId = futId;
            this.mappings = mappings;
        }

        /**
         * @return Future ID.
         */
        int futureId() {
            return futId;
        }

        /**
         * @return Node ID.
         */
        public ClusterNode node() {
            return m.primary();
        }

        /**
         * @return Keys.
         */
        public GridDistributedTxMapping mapping() {
            return m;
        }

        /**
         * @param e Error.
         */
        void onResult(Throwable e) {
            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                if (log.isDebugEnabled())
                    log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');

                // Fail.
                onDone(e);
            }
            else
                U.warn(log, "Received error after another result has been processed [fut=" +
                    parent + ", mini=" + this + ']', e);
        }

        /**
         * @param e Node failure.
         * @param discoThread {@code True} if executed from discovery thread.
         */
        void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) {
            if (msgLog.isDebugEnabled()) {
                msgLog.debug("Near optimistic prepare fut, mini future node left [txId=" + parent.tx.nearXidVersion() +
                    ", node=" + m.primary().id() + ']');
            }

            if (isDone())
                return;

            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                if (log.isDebugEnabled())
                    log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);

                // Fail the whole future (make sure not to remap on different primary node
                // to prevent multiple lock coordinators).
                parent.onError(e, discoThread);
            }
        }

        /**
         * @param res Result callback.
         */
        void onResult(final GridNearTxPrepareResponse res) {
            if (isDone())
                return;

            if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                if (parent.tx.remainingTime() == -1 || res.error() instanceof IgniteTxTimeoutCheckedException) {
                    parent.onTimeout();

                    return;
                }

                if (res.error() != null) {
                    // Fail the whole compound future.
                    parent.onError(res.error(), false);
                }
                else {
                    if (res.clientRemapVersion() != null) {
                        assert parent.cctx.kernalContext().clientNode();
                        assert m.clientFirst();

                        IgniteInternalFuture<?> affFut =
                            parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion());

                        parent.cctx.time().waitAsync(affFut, parent.tx.remainingTime(), (e, timedOut) -> {
                            if (parent.errorOrTimeoutOnTopologyVersion(e, timedOut))
                                return;

                            remap();
                        });
                    }
                    else {
                        parent.onPrepareResponse(m, res, m.hasNearCacheEntries());

                        // Proceed prepare before finishing mini future.
                        if (mappings != null)
                            parent.proceedPrepare(mappings);

                        // Finish this mini future.
                        onDone((GridNearTxPrepareResponse)null);
                    }
                }
            }
        }

        /**
         *
         */
        private void remap() {
            if (parent.tx.isRollbackOnly()) {
                onDone(new IgniteTxRollbackCheckedException(
                    "Failed to prepare the transaction, due to the transaction is marked as rolled back " +
                        "[tx=" + CU.txString(parent.tx) + ']'));

                return;
            }

            parent.prepareOnTopology(true, new Runnable() {
                @Override public void run() {
                    onDone((GridNearTxPrepareResponse)null);
                }
            });
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
        }
    }

    /**
     *
     */
    private static class MappingKey {
        /** */
        private final UUID nodeId;

        /** */
        private final boolean nearEntries;

        /**
         * @param nodeId Node ID.
         * @param nearEntries Near cache entries flag (should be true only for local node).
         */
        MappingKey(UUID nodeId, boolean nearEntries) {
            this.nodeId = nodeId;
            this.nearEntries = nearEntries;
        }

        /** {@inheritDoc} */
        @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
        @Override public boolean equals(Object o) {
            MappingKey that = (MappingKey)o;

            return nearEntries == that.nearEntries && nodeId.equals(that.nodeId);
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            int res = nodeId.hashCode();
            res = 31 * res + (nearEntries ? 1 : 0);
            return res;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(MappingKey.class, this);
        }
    }
}
